import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.util.Collector;


public class PointAssigner2
        implements FlatJoinFunction<Tuple2<String, String>, Rating, Tuple2<String, Integer>> {
    @Override
    public void join(Tuple2<String, String> movie, Rating rating, Collector<Tuple2<String, Integer>> out) {
        if (rating == null) {
            out.collect(new Tuple2<String, Integer>(movie.f0, -1));
        } else if (rating.points < 10) {
            out.collect(new Tuple2<String, Integer>(movie.f0, rating.points));
        } else {
            out.collect(new Tuple2<String, Integer>(movie.f0, rating.points));
        }
    }

}